{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Azure ML & Azure Databricks notebooks by Parashar Shah.\n",
        "\n",
        "Copyright (c) Microsoft Corporation. All rights reserved.\n",
        "\n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#Model Building"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import os\n",
        "import pprint\n",
        "import numpy as np\n",
        "\n",
        "from pyspark.ml import Pipeline, PipelineModel\n",
        "from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler\n",
        "from pyspark.ml.classification import LogisticRegression\n",
        "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
        "from pyspark.ml.tuning import CrossValidator, ParamGridBuilder"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Set auth to be used by workspace related APIs.\n",
        "# For automation or CI/CD ServicePrincipalAuthentication can be used.\n",
        "# https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.authentication.serviceprincipalauthentication?view=azure-ml-py\n",
        "auth = None"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# import the Workspace class and check the azureml SDK version\n",
        "from azureml.core import Workspace\n",
        "\n",
        "ws = Workspace.from_config(auth = auth)\n",
        "print('Workspace name: ' + ws.name, \n",
        "      'Azure region: ' + ws.location, \n",
        "      'Subscription id: ' + ws.subscription_id, \n",
        "      'Resource group: ' + ws.resource_group, sep = '\\n')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "#get the train and test datasets\n",
        "train_data_path = \"AdultCensusIncomeTrain\"\n",
        "test_data_path = \"AdultCensusIncomeTest\"\n",
        "\n",
        "train = spark.read.parquet(train_data_path)\n",
        "test = spark.read.parquet(test_data_path)\n",
        "\n",
        "print(\"train: ({}, {})\".format(train.count(), len(train.columns)))\n",
        "print(\"test: ({}, {})\".format(test.count(), len(test.columns)))\n",
        "\n",
        "train.printSchema()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#Define Model"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "label = \"income\"\n",
        "dtypes = dict(train.dtypes)\n",
        "dtypes.pop(label)\n",
        "\n",
        "si_xvars = []\n",
        "ohe_xvars = []\n",
        "featureCols = []\n",
        "for idx,key in enumerate(dtypes):\n",
        "    if dtypes[key] == \"string\":\n",
        "        featureCol = \"-\".join([key, \"encoded\"])\n",
        "        featureCols.append(featureCol)\n",
        "        \n",
        "        tmpCol = \"-\".join([key, \"tmp\"])\n",
        "        # string-index and one-hot encode the string column\n",
        "        #https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html\n",
        "        #handleInvalid: Param for how to handle invalid data (unseen labels or NULL values). \n",
        "        #Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), \n",
        "        #or 'keep' (put invalid data in a special additional bucket, at index numLabels). Default: \"error\"\n",
        "        si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid=\"skip\"))\n",
        "        ohe_xvars.append(OneHotEncoder(inputCol=tmpCol, outputCol=featureCol))\n",
        "    else:\n",
        "        featureCols.append(key)\n",
        "\n",
        "# string-index the label column into a column named \"label\"\n",
        "si_label = StringIndexer(inputCol=label, outputCol='label')\n",
        "\n",
        "# assemble the encoded feature columns in to a column named \"features\"\n",
        "assembler = VectorAssembler(inputCols=featureCols, outputCol=\"features\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.run import Run\n",
        "from azureml.core.experiment import Experiment\n",
        "import numpy as np\n",
        "import os\n",
        "import shutil\n",
        "\n",
        "model_name = \"AdultCensus_runHistory.mml\"\n",
        "model_dbfs = os.path.join(\"/dbfs\", model_name)\n",
        "run_history_name = 'spark-ml-notebook'\n",
        "\n",
        "# start a training run by defining an experiment\n",
        "myexperiment = Experiment(ws, \"Ignite_AI_Talk\")\n",
        "root_run = myexperiment.start_logging()\n",
        "\n",
        "# Regularization Rates - \n",
        "regs = [0.0001, 0.001, 0.01, 0.1]\n",
        " \n",
        "# try a bunch of regularization rate in a Logistic Regression model\n",
        "for reg in regs:\n",
        "    print(\"Regularization rate: {}\".format(reg))\n",
        "    # create a bunch of child runs\n",
        "    with root_run.child_run(\"reg-\" + str(reg)) as run:\n",
        "        # create a new Logistic Regression model.\n",
        "        lr = LogisticRegression(regParam=reg)\n",
        "        \n",
        "        # put together the pipeline\n",
        "        pipe = Pipeline(stages=[*si_xvars, *ohe_xvars, si_label, assembler, lr])\n",
        "\n",
        "        # train the model\n",
        "        model_p = pipe.fit(train)\n",
        "        \n",
        "        # make prediction\n",
        "        pred = model_p.transform(test)\n",
        "        \n",
        "        # evaluate. note only 2 metrics are supported out of the box by Spark ML.\n",
        "        bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')\n",
        "        au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)\n",
        "        au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)\n",
        "\n",
        "        print(\"Area under ROC: {}\".format(au_roc))\n",
        "        print(\"Area Under PR: {}\".format(au_prc))\n",
        "      \n",
        "        # log reg, au_roc, au_prc and feature names in run history\n",
        "        run.log(\"reg\", reg)\n",
        "        run.log(\"au_roc\", au_roc)\n",
        "        run.log(\"au_prc\", au_prc)\n",
        "        run.log_list(\"columns\", train.columns)\n",
        "\n",
        "        # save model\n",
        "        model_p.write().overwrite().save(model_name)\n",
        "        \n",
        "        # upload the serialized model into run history record\n",
        "        mdl, ext = model_name.split(\".\")\n",
        "        model_zip = mdl + \".zip\"\n",
        "        shutil.make_archive(mdl, 'zip', model_dbfs)\n",
        "        run.upload_file(\"outputs/\" + model_name, model_zip)        \n",
        "        #run.upload_file(\"outputs/\" + model_name, path_or_stream = model_dbfs) #cannot deal with folders\n",
        "\n",
        "        # now delete the serialized model from local folder since it is already uploaded to run history \n",
        "        shutil.rmtree(model_dbfs)\n",
        "        os.remove(model_zip)\n",
        "        \n",
        "# Declare run completed\n",
        "root_run.complete()\n",
        "root_run_id = root_run.id\n",
        "print (\"run id:\", root_run.id)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "metrics = root_run.get_metrics(recursive=True)\n",
        "best_run_id = max(metrics, key = lambda k: metrics[k]['au_roc'])\n",
        "print(best_run_id, metrics[best_run_id]['au_roc'], metrics[best_run_id]['reg'])"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "#Get the best run\n",
        "child_runs = {}\n",
        "\n",
        "for r in root_run.get_children():\n",
        "    child_runs[r.id] = r\n",
        "   \n",
        "best_run = child_runs[best_run_id]"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "#Download the model from the best run to a local folder\n",
        "best_model_file_name = \"best_model.zip\"\n",
        "best_run.download_file(name = 'outputs/' + model_name, output_file_path = best_model_file_name)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#Model Evaluation"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "##unzip the model to dbfs (as load() seems to require that) and load it.\n",
        "if os.path.isfile(model_dbfs) or os.path.isdir(model_dbfs):\n",
        "    shutil.rmtree(model_dbfs)\n",
        "shutil.unpack_archive(best_model_file_name, model_dbfs)\n",
        "\n",
        "model_p_best = PipelineModel.load(model_name)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# make prediction\n",
        "pred = model_p_best.transform(test)\n",
        "output = pred[['hours_per_week','age','workclass','marital_status','income','prediction']]\n",
        "display(output.limit(5))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# evaluate. note only 2 metrics are supported out of the box by Spark ML.\n",
        "bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')\n",
        "au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)\n",
        "au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)\n",
        "\n",
        "print(\"Area under ROC: {}\".format(au_roc))\n",
        "print(\"Area Under PR: {}\".format(au_prc))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#Model Persistence"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "##NOTE: by default the model is saved to and loaded from /dbfs/ instead of cwd!\n",
        "model_p_best.write().overwrite().save(model_name)\n",
        "print(\"saved model to {}\".format(model_dbfs))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "%sh\n",
        "\n",
        "ls -la /dbfs/AdultCensus_runHistory.mml/*"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "dbutils.notebook.exit(\"success\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/azure-databricks/amlsdk/build-model-run-history-03.png)"
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "pasha"
      }
    ],
    "kernelspec": {
      "display_name": "Python 3.6",
      "language": "python",
      "name": "python36"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.6"
    },
    "name": "build-model-run-history-03",
    "notebookId": 3836944406456339
  },
  "nbformat": 4,
  "nbformat_minor": 1
}